package custom;

import beans.SensorReading;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;

/**
 * 定义一个有状态的map操作，学习键控状态API
 *
 * @author lvbingbing
 * @date 2022-01-05 22:30
 */
public class MyKeyCountMapper extends RichMapFunction<SensorReading, Integer> {

    private static final long serialVersionUID = 2260812562327726776L;

    private transient ValueState<Integer> myValueState;

    private transient ListState<String> myListState;

    private transient MapState<String, Double> myMapState;

    private transient ReducingState<SensorReading> myReducingState;

    @Override
    public void open(Configuration parameters) throws Exception {
        myValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("myValueState", Integer.class));
        myListState = getRuntimeContext().getListState(new ListStateDescriptor<>("myListState", String.class));
        myMapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("myMapState", String.class, Double.class));
        myReducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<>("myReducingState", (value1, value2) -> value1, SensorReading.class));
    }

    @Override
    public Integer map(SensorReading sensorReading) throws Exception {
        myValueState.update(sensorReading.hashCode());
        myListState.add(sensorReading.getId());
        for (String str : myListState.get()) {

            System.out.println(str);
        }
        myMapState.put("1", 12.0);
        myMapState.remove("1");
        myReducingState.add(sensorReading);
        myMapState.clear();
        return sensorReading.getId().length();
    }
}
